package com.udf.flink.scala.apitest.watermark

import org.apache.flink.api.common.state.ValueStateDescriptor
import org.apache.flink.api.scala.typeutils.Types
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.functions.ProcessFunction
import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.scala.function.ProcessWindowFunction
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.api.windowing.windows.TimeWindow
import org.apache.flink.util.Collector

object AllowedLateTest {
  def main(args: Array[String]): Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    env.setParallelism(1)
//    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)

    val stream = env.socketTextStream("Linux1", 9999, '\n')
    val s = stream
      .map(line => {
        val arr = line.split(" ")
        (arr(0), arr(1).toLong * 1000)
      })
      //      .assignAscendingTimestamps(_._2)
      .assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor[(String, Long)](Time.seconds(5)) {
        override def extractTimestamp(element: (String, Long)): Long = element._2
      })
      .keyBy(_._1)
      // [0,5),...
      .timeWindow(Time.seconds(5))
      // 水位线超过 窗口结束时间 窗口闭合计算，但不销毁
      // 水位线超过 窗口结束时间 + allowed lateness，窗口更新结果并销毁
      .allowedLateness(Time.seconds(5))
      .process(new MyAllowedLateProcess)
    s.print()
    env.execute()
  }
  class MyAllowedLateProcess extends ProcessWindowFunction[(String, Long),
    String, String,TimeWindow] {
    override def process(key: String,
                         context: Context,
                         elements: Iterable[(String, Long)],
                         out: Collector[String]): Unit = {
      lazy val isUpdate = getRuntimeContext.getState(
        new ValueStateDescriptor[Boolean]("update", Types.of[Boolean])
      )
      if (!isUpdate.value()) {
        out.collect("在水位线超过窗口结束时间的时候，窗口第一次闭合计算")
        isUpdate.update(true)
      } else {
        out.collect("迟到元素来了以后，更新窗口闭合计算的结果")
      }
    }
  }
}